
<html>
<body bgcolor="#FFFFFF">
This package supports efficient multi-point network monitoring infrastructure
for collection of streaming data from many Monitors.

<center>
<a name="protocol"><h2><i>Simple Streaming Data Protocol</i></h2></a>
</center>

<ul>
<li> A <b><i>record stream</i></b> is a sequence of bytes, consisting of  
a <b><i>preamble</i></b> followed by a <b><i>body</i></b>.<p>
<li> The <b><i>preamble</i></b> consists of two Strings in UTF format 
and Java byte order.  <p>
 <ul>
 <li>If the first String is "record" then the second String names the stream, 
     and the body consists of zero or more <b><i>records</i></b>.   <p>
 <li>If the first String is anything other than "record" the subsequent  
 String and stream body are undefined. <p>
 </ul><p>
<li> Each <b><i>record</i></b> consists of the following fields, written  
in Java byte order and encoded according to Java's standard primitive data 
type serialization rules: <p>

<table border=1>
<tr><td>Bytes</td><td>Java Type</td><td>Interpretation</td></tr>
<tr><td>0..3</td><td>int</td><td>Service code (record data type)</td></tr>
<tr><td>4..7</td><td>int</td><td>Source code (writer identification)</td></tr>
<tr><td>8..15</td><td>double</td><td>Timestamp (seconds since epoch)</td></tr>
<tr><td>16..19</td><td>int</td><td>Record length (bytes to follow)</td></tr>
<tr><td>20..end</td><td>bytes</td><td>User-defined data</td></tr>
</table><p>

<li>The integer codes used to identify record types and origins are  
generated uniquely for each stream, and cannot be relied upon to remain
the same, even across subsequent runs of the same code.  Their mapping  
to strings is performed inline within the stream, using special dynamic 
dictionary-building record types embedded in the stream.  These codes 
are portably obtained using the <code>getRecordTypeCode(String)</code> and 
<code>getRecordSourceCode(String)</code> methods,
and resolved using the <code>getRecordTypeString(int)</code> and 
<code>getRecordSourceString(int)</code> 
methods, all specified in <code>interface StreamInterface</code>.<p>

<hr>

<center><h2><i>SSF.Util.Streams</i></h2></center><p>

The <b>Streams</b> package contains four classes for very basic 
record-oriented streaming data export from SSFNet simulations.  
These classes implement the simple streaming data protocol, described  
<a href="#protocol">above.</a>

<ul>

<p><li><h3>class streamException extends Exception</h3>
<quote>
  Simple exception class that can be thrown by stream setup methods. 
</quote>

<p><li><h3>class BasicRecorder implements StreamInterface</h3>
<quote>
   BasicRecorder demonstrates how to build a simple implementation 
   of a StreamInterface for portably emitting a stream of records. 
</quote>
<p><li><h3>class BasicPlayer implements StreamInterface</h3>
<quote>
   BasicPlayer demonstrates how to build a simple implementation 
   of a StreamInterface for portably processing a stream of records. 
</quote>

<p><p><li><h3>interface StreamInterface</h3>
<quote>
 Interface for sending and/or receiving a stream of records, each indexed 
 by a small standard header.  This header identifies the type of each 
 record, the writer of the record, the time at which the record was 
 generated, and the number of bytes to follow in a user-defined format.   
 The type and writer are given as integer codes, which correspond 
 to arbitrary-length strings sent in-stream to construct a 
 pair of queryable dynamic data dictionaries.  See the description of 
 the small streaming data protocol below for more details. 
 The interface specifies the following operations:<p>

 <quote>
    Connect the stream to a data sink or source at the given URL, 
    throwing a streamException if there are any problems: 
 </quote>
 <pre>
    public void connectWrite(String url) throws streamException;

    public void connectRead(String url) throws streamException;
 </pre><p>
 <quote>
     Return true if this stream has been successfully connected 
     to a data source or sink, and not disconnected: 
 </quote>
 <pre>
    public boolean isConnected();
 </pre><p>
 <quote>
     Signal that no more records are to be received (if reading)
     or sent (if writing): 
 </quote>
 <pre>
    public void disconnect();
 </pre><p>
 <quote> 
    Process a single incoming record in a data stream connected for reading:
 </quote>
 <pre>
    public int receive(int type_code,
		       int source_code,
		       double timestamp, 
		       byte[] bytes, 
		       int offset,
		       int length);
 </pre><p>
 <quote> 
    Emit a single record on a data stream connected for writing, returning
    zero if the record is successfully emitted, or a nonzero value if 
    there is an error or if a filter has suppressed the record from being 
    written.  The short form (without payload) may be used to test whether 
    a record will be emitted or suppressed, to save the overhead of actually
    preparing it for transmission: 
 </quote>
 <pre>
    public int send(int type_code,
		    int source_code,
		    double timestamp); 

    public int send(int type_code,
		    int source_code,
		    double timestamp, 
		    byte[] bytes, 
		    int offset,
		    int length);  // long form
 </pre><p>
 <quote>
    Map a user-defined record type string to an integer code, or vice-versa:
  <quote>
 <pre>
    public String getRecordTypeString(int code); 

    public int getRecordTypeCode(String name); 
 </pre><p>
 <quote>
    Map a user-defined sender ID string to an integer code, or vice-versa:
  <quote>
 <pre>
    public String getRecordSourceString(int code); 

    public int getRecordSourceCode(String name); 
 </pre><p>
</quote>
</ul><p>
</quote>
To write a stream of records, the user typically constructs a BasicRecorder, 
connects it to a data sink, and calls send() repeatedly to emit records 
before disconnecting().  Note that the sender uses the short form of send()
with no payload to test stream status before committing to the overhead of 
preparing the payload bytes.  There's no sense preparing bytes that will
be dropped because the stream is dropping or suppressing output for some 
reason.<p>
<pre>
      StreamInterface myRecorder = new BasicRecorder("this names my stream");

      myRecorder.connectWrite("file:/tmp/stream.dat");

      int tid = myRecorder.getRecordTypeCode("my record type");
      int sid = myRecorder.getRecordSourceCode("my writer id");

      double now = .. ; // get timestamp from simulator or clock

      if (0 == myRecorder.send(tid,sid,now)) {  // test for suppression

        byte[] mybuffer = .. ; // prepare the bytes to be emitted
        myRecorder.send(tid,sid,now, mybuffer,0,mybuffer.length);

      }

      // .. do more sends until finished ..

      myRecoder.disconnect();

</pre><p>
To read the records later, the user typically constructs a BasicPlayer
and connects it to a data source; the BasicPlayer calls back receive() 
each time a record arrives: 
<pre>
      /** For this example only: use anonymous inner class BasicPlayer 
        * to demonstrate specialized record processing.  We override 
        * the default behavior for one type of record, and defer to the 
        * base class default for all other types of records. 
        */
      StreamInterface myPlayer = new BasicPlayer("this names my stream") {

        public void receive(int tid, int sid, double time, 
	                    byte[] buf, int offset, int length) {

           if (tid == getRecordTypeCode("my record type")) {

	     // .. process this record content appropriately 

           }

           else super.receive(tid,sid,time,buf,offset,length);
        }
      };

      myPlayer.connectRead("file:/tmp/stream.dat"); // calls back receive()..

</pre>

<hr>

<center><h2><i>SSF.OS.ProbeSession</i></h2></center><p>

Finally, one new SSFNet class makes it easier to manage record streams 
in a multitimeline context.  Configure an instance of the 
<code>ProbeSession</code> protocol under the standard name "probe" in 
each host or router where probing is to be enabled:<p>
<pre>
    ProtocolGraph [
      # .. traditional protocols here 
 
      ProtocolSession [
	name probe use SSF.OS.ProbeSession
	file "/tmp/mystream.dat"
        stream "My Stream"
    ]
</pre><p>
Then, from any protocol or protocol-related code, access the "probe"
protocol and call <code>getRecorder</code> to get a handle on an 
implementation of <code>StreamInterface</code> suitable for sending 
records:<p> 
<pre>
     ProtocolSession theProbe = 
	(ProbeSession)inGraph().SessionForName("probe");

     StreamInterface theStream = theProbe.getRecorder(); // preconnected
         
     int myHostCode = theProbe.getHostCode();  // uses the NHI address 

     int myDatatypeCode = theStream.getRecordTypeCode("my record type");

     theStream.send(myDatatypeCode,myHostCode,now()/Net.frequency(),
                    myBytes, 0, myBytes.length);
</pre>

The streams accessed in this way are managed by the system-wide collection
of ProbeSessions; they are automatically disconnected at the end of 
simulation time.  The stream IDs and file names are not used directly;
the ProbeSessions attach a dot (".") and the integer ID of the local 
timeline.  In the following example, in a four-timeline model, four streams 
are actually created behind the scenes:  <p>
<pre>
      ProtocolSession [
	name probe use SSF.OS.ProbeSession
	file "/tmp/mystream.dat"
        stream "My Stream"
    ]

    #
    # Streams created: "My Stream.0" "My Stream.1" "My Stream.2" "My Stream.3"
    # 
    # Files created: /tmp/mystream.dat.[0-3]
    #
</pre>

</body>
</html>
